In [1]:
import os
import cv2
import matplotlib.pyplot as plt
import numpy as np
# Function to detect shadows using intensity comparison
def detect_shadows(image):
# Convert the image to grayscale
gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
# Calculate a threshold based on intensity (adjust the value as needed)
threshold_value = 50
_, thresh = cv2.threshold(gray, threshold_value, 255, cv2.THRESH_BINARY)
return thresh
# Directory containing the image dataset
tmc_images_directory = 'cbpm.v2i.coco/train/'
# Assuming you have a list of image file names in tmc_image_files
tmc_image_files = [f for f in os.listdir(tmc_images_directory) if os.path.isfile(os.path.join(tmc_images_directory, f))]
# Process images and detect shadows
shadow_masks = []
for img_file in tmc_image_files[:]:
img_path = os.path.join(tmc_images_directory, img_file)
image = cv2.imread(img_path)
if image is None:
print(f"Failed to load image at {img_path}")
continue
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
# Detect shadows in the image
shadow_mask = detect_shadows(image)
shadow_masks.append(shadow_mask)
# Display sample images with shadow masks
num_samples = 5
plt.figure(figsize=(15, 6))
for i in range(num_samples):
plt.subplot(2, num_samples, i + 1)
plt.imshow(cv2.cvtColor(cv2.imread(os.path.join(tmc_images_directory, tmc_image_files[i])), cv2.COLOR_BGR2RGB))
plt.title('Original Image')
plt.axis('off')
plt.subplot(2, num_samples, num_samples + i + 1)
plt.imshow(shadow_masks[i], cmap='gray')
plt.title('Shadow Mask')
plt.axis('off')
plt.tight_layout()
plt.show()
Failed to load image at cbpm.v2i.coco/train/_annotations.coco.json
In [2]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import os
import torch
import torchvision
from torchvision import datasets, models
from torchvision.transforms import functional as FT
from torchvision import transforms as T
from torch import nn, optim
from torch.nn import functional as F
from torch.utils.data import DataLoader, sampler, random_split, Dataset
import copy
import math
from PIL import Image
import cv2
import albumentations as A # our data augmentation library
import matplotlib.pyplot as plt
%matplotlib inline
In [3]:
# remove arnings (optional)
import warnings
warnings.filterwarnings("ignore")
from collections import defaultdict, deque
import datetime
import time
from tqdm import tqdm # progress bar
from torchvision.utils import draw_bounding_boxes
In [4]:
print(torch.__version__)
print(torchvision.__version__)
2.1.2+cu121 0.16.2+cu121
In [5]:
# our dataset is in cocoformat, we will need pypcoco tools
!pip install pycocotools
from pycocotools.coco import COCO
Defaulting to user installation because normal site-packages is not writeable Requirement already satisfied: pycocotools in ./.local/lib/python3.10/site-packages (2.0.7) Requirement already satisfied: matplotlib>=2.1.0 in /usr/lib/python3/dist-packages (from pycocotools) (3.5.1) Requirement already satisfied: numpy in ./.local/lib/python3.10/site-packages (from pycocotools) (1.26.2)
In [6]:
# Now, we will define our transforms
from albumentations.pytorch import ToTensorV2
In [7]:
def get_transforms(train=False):
if train:
transform = A.Compose([
A.Resize(600, 600), # our input size can be 600px
A.HorizontalFlip(p=0.3),
A.VerticalFlip(p=0.3),
A.RandomBrightnessContrast(p=0.1),
A.ColorJitter(p=0.1),
ToTensorV2()
], bbox_params=A.BboxParams(format='coco'))
else:
transform = A.Compose([
A.Resize(600, 600), # our input size can be 600px
ToTensorV2()
], bbox_params=A.BboxParams(format='coco'))
return transform
In [8]:
class AquariumDetection(datasets.VisionDataset):
def __init__(self, root, split='train', transform=None, target_transform=None, transforms=None):
# the 3 transform parameters are reuqired for datasets.VisionDataset
super().__init__(root, transforms, transform, target_transform)
self.split = split #train, valid, test
self.coco = COCO(os.path.join(root, split, "_annotations.coco.json")) # annotatiosn stored here
self.ids = list(sorted(self.coco.imgs.keys()))
self.ids = [id for id in self.ids if (len(self._load_target(id)) > 0)]
def _load_image(self, id: int):
path = self.coco.loadImgs(id)[0]['file_name']
image = cv2.imread(os.path.join(self.root, self.split, path))
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
return image
def _load_target(self, id):
return self.coco.loadAnns(self.coco.getAnnIds(id))
def __getitem__(self, index):
id = self.ids[index]
image = self._load_image(id)
target = self._load_target(id)
target = copy.deepcopy(self._load_target(id))
boxes = [t['bbox'] + [t['category_id']] for t in target] # required annotation format for albumentations
if self.transforms is not None:
transformed = self.transforms(image=image, bboxes=boxes)
image = transformed['image']
boxes = transformed['bboxes']
new_boxes = [] # convert from xywh to xyxy
for box in boxes:
xmin = box[0]
xmax = xmin + box[2]
ymin = box[1]
ymax = ymin + box[3]
new_boxes.append([xmin, ymin, xmax, ymax])
boxes = torch.tensor(new_boxes, dtype=torch.float32)
targ = {} # here is our transformed target
targ['boxes'] = boxes
targ['labels'] = torch.tensor([t['category_id'] for t in target], dtype=torch.int64)
targ['image_id'] = torch.tensor([t['image_id'] for t in target])
targ['area'] = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0]) # we have a different area
targ['iscrowd'] = torch.tensor([t['iscrowd'] for t in target], dtype=torch.int64)
return image.div(255), targ # scale images
def __len__(self):
return len(self.ids)
In [9]:
dataset_path = "cbpm.v2i.coco/"
In [10]:
#load classes
coco = COCO(os.path.join(dataset_path, "train", "_annotations.coco.json"))
categories = coco.cats
n_classes = len(categories.keys())
categories
loading annotations into memory... Done (t=0.00s) creating index... index created!
Out[10]:
{0: {'id': 0, 'name': 'satelites', 'supercategory': 'none'},
1: {'id': 1, 'name': 'Asteroid', 'supercategory': 'satelites'},
2: {'id': 2, 'name': 'Damage', 'supercategory': 'satelites'},
3: {'id': 3, 'name': 'Debris', 'supercategory': 'satelites'},
4: {'id': 4, 'name': 'Explosion', 'supercategory': 'satelites'},
5: {'id': 5, 'name': 'Satelite', 'supercategory': 'satelites'}}
In [11]:
classes = [i[1]['name'] for i in categories.items()]
classes
Out[11]:
['satelites', 'Asteroid', 'Damage', 'Debris', 'Explosion', 'Satelite']
In [12]:
train_dataset = AquariumDetection(root=dataset_path, transforms=get_transforms(True))
loading annotations into memory... Done (t=0.00s) creating index... index created!
In [13]:
num_samples_to_display = 5 # Set the number of samples to display
fig, axes = plt.subplots(1, num_samples_to_display, figsize=(20, 10))
for i in range(num_samples_to_display):
sample = train_dataset[i] # Access each sample from the dataset
img_int = torch.tensor(sample[0] * 255, dtype=torch.uint8) # Convert image to tensor
# Plot each image with bounding boxes
axes[i].imshow(draw_bounding_boxes(
img_int, sample[1]['boxes'], [classes[i] for i in sample[1]['labels']], width=4
).permute(1, 2, 0))
axes[i].axis('off') # Turn off axis labels
plt.tight_layout()
plt.show()
In [14]:
len(train_dataset)
Out[14]:
918
In [15]:
# lets load the faster rcnn model
model = models.detection.fasterrcnn_mobilenet_v3_large_fpn(pretrained=True)
in_features = model.roi_heads.box_predictor.cls_score.in_features # we need to change the head
model.roi_heads.box_predictor = models.detection.faster_rcnn.FastRCNNPredictor(in_features, n_classes)
In [16]:
def collate_fn(batch):
return tuple(zip(*batch))
In [17]:
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True, num_workers=4, collate_fn=collate_fn)
In [18]:
images,targets = next(iter(train_loader))
images = list(image for image in images)
targets = [{k:v for k, v in t.items()} for t in targets]
output = model(images, targets) # just make sure this runs without error
In [19]:
device = torch.device("cuda")
model = model.to(device)
In [20]:
# Now, and optimizer
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.01, momentum=0.9, nesterov=True, weight_decay=1e-4)
# lr_scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, milestones=[16, 22], gamma=0.1) # lr scheduler
In [21]:
import sys
In [22]:
def train_one_epoch(model, optimizer, loader, device, epoch):
model.to(device)
model.train()
# lr_scheduler = None
# if epoch == 0:
# warmup_factor = 1.0 / 1000 # do lr warmup
# warmup_iters = min(1000, len(loader) - 1)
# lr_scheduler = optim.lr_scheduler.LinearLR(optimizer, start_factor = warmup_factor, total_iters=warmup_iters)
all_losses = []
all_losses_dict = []
for images, targets in tqdm(loader):
images = list(image.to(device) for image in images)
targets = [{k: torch.tensor(v).to(device) for k, v in t.items()} for t in targets]
loss_dict = model(images, targets) # the model computes the loss automatically if we pass in targets
losses = sum(loss for loss in loss_dict.values())
loss_dict_append = {k: v.item() for k, v in loss_dict.items()}
loss_value = losses.item()
all_losses.append(loss_value)
all_losses_dict.append(loss_dict_append)
if not math.isfinite(loss_value):
print(f"Loss is {loss_value}, stopping trainig") # train if loss becomes infinity
print(loss_dict)
sys.exit(1)
optimizer.zero_grad()
losses.backward()
optimizer.step()
# if lr_scheduler is not None:
# lr_scheduler.step() #
all_losses_dict = pd.DataFrame(all_losses_dict) # for printing
print("Epoch {}, lr: {:.6f}, loss: {:.6f}, loss_classifier: {:.6f}, loss_box: {:.6f}, loss_rpn_box: {:.6f}, loss_object: {:.6f}".format(
epoch, optimizer.param_groups[0]['lr'], np.mean(all_losses),
all_losses_dict['loss_classifier'].mean(),
all_losses_dict['loss_box_reg'].mean(),
all_losses_dict['loss_rpn_box_reg'].mean(),
all_losses_dict['loss_objectness'].mean()
))
In [23]:
num_epochs = 15
for epoch in range(num_epochs):
train_one_epoch(model, optimizer, train_loader, device, epoch)
# Assuming lr_scheduler is active
# lr_scheduler.step()
100%|█████████████████████████████████████████████████████████████████████████████████| 230/230 [00:27<00:00, 8.31it/s]
Epoch 0, lr: 0.010000, loss: 0.644064, loss_classifier: 0.269256, loss_box: 0.280233, loss_rpn_box: 0.036856, loss_object: 0.057719
100%|█████████████████████████████████████████████████████████████████████████████████| 230/230 [00:24<00:00, 9.37it/s]
Epoch 1, lr: 0.010000, loss: 0.597864, loss_classifier: 0.243627, loss_box: 0.290089, loss_rpn_box: 0.029598, loss_object: 0.034549
100%|█████████████████████████████████████████████████████████████████████████████████| 230/230 [00:23<00:00, 9.76it/s]
Epoch 2, lr: 0.010000, loss: 0.597391, loss_classifier: 0.235885, loss_box: 0.307767, loss_rpn_box: 0.027909, loss_object: 0.025829
100%|█████████████████████████████████████████████████████████████████████████████████| 230/230 [00:23<00:00, 9.95it/s]
Epoch 3, lr: 0.010000, loss: 0.597395, loss_classifier: 0.228864, loss_box: 0.323123, loss_rpn_box: 0.023719, loss_object: 0.021689
100%|█████████████████████████████████████████████████████████████████████████████████| 230/230 [00:22<00:00, 10.11it/s]
Epoch 4, lr: 0.010000, loss: 0.589719, loss_classifier: 0.224530, loss_box: 0.322727, loss_rpn_box: 0.022882, loss_object: 0.019580
100%|█████████████████████████████████████████████████████████████████████████████████| 230/230 [00:22<00:00, 10.32it/s]
Epoch 5, lr: 0.010000, loss: 0.609750, loss_classifier: 0.231716, loss_box: 0.339191, loss_rpn_box: 0.022100, loss_object: 0.016742
100%|█████████████████████████████████████████████████████████████████████████████████| 230/230 [00:22<00:00, 10.15it/s]
Epoch 6, lr: 0.010000, loss: 0.582885, loss_classifier: 0.221081, loss_box: 0.322035, loss_rpn_box: 0.022316, loss_object: 0.017454
100%|█████████████████████████████████████████████████████████████████████████████████| 230/230 [00:21<00:00, 10.48it/s]
Epoch 7, lr: 0.010000, loss: 0.601019, loss_classifier: 0.213936, loss_box: 0.354294, loss_rpn_box: 0.020831, loss_object: 0.011958
100%|█████████████████████████████████████████████████████████████████████████████████| 230/230 [00:21<00:00, 10.49it/s]
Epoch 8, lr: 0.010000, loss: 0.596454, loss_classifier: 0.214189, loss_box: 0.349867, loss_rpn_box: 0.019790, loss_object: 0.012608
100%|█████████████████████████████████████████████████████████████████████████████████| 230/230 [00:21<00:00, 10.53it/s]
Epoch 9, lr: 0.010000, loss: 0.574125, loss_classifier: 0.200085, loss_box: 0.345054, loss_rpn_box: 0.018266, loss_object: 0.010719
100%|█████████████████████████████████████████████████████████████████████████████████| 230/230 [00:21<00:00, 10.81it/s]
Epoch 10, lr: 0.010000, loss: 0.580293, loss_classifier: 0.207746, loss_box: 0.345664, loss_rpn_box: 0.017131, loss_object: 0.009752
100%|█████████████████████████████████████████████████████████████████████████████████| 230/230 [00:21<00:00, 10.67it/s]
Epoch 11, lr: 0.010000, loss: 0.547217, loss_classifier: 0.193005, loss_box: 0.322140, loss_rpn_box: 0.018093, loss_object: 0.013978
100%|█████████████████████████████████████████████████████████████████████████████████| 230/230 [00:21<00:00, 10.68it/s]
Epoch 12, lr: 0.010000, loss: 0.550718, loss_classifier: 0.187332, loss_box: 0.333705, loss_rpn_box: 0.017842, loss_object: 0.011839
100%|█████████████████████████████████████████████████████████████████████████████████| 230/230 [00:21<00:00, 10.75it/s]
Epoch 13, lr: 0.010000, loss: 0.580478, loss_classifier: 0.197601, loss_box: 0.358301, loss_rpn_box: 0.016563, loss_object: 0.008013
100%|█████████████████████████████████████████████████████████████████████████████████| 230/230 [00:21<00:00, 10.52it/s]
Epoch 14, lr: 0.010000, loss: 0.509273, loss_classifier: 0.174541, loss_box: 0.309994, loss_rpn_box: 0.015732, loss_object: 0.009006
In [24]:
new_labels = ['Asteroid', 'Damage', 'Debris', 'Explosion', 'Satellite']
# Add 'Satellite' to the existing classes
classes = new_labels + ['Satellite']
test_dataset = AquariumDetection(root=dataset_path, split="test", transforms=get_transforms(False))
model.eval()
torch.cuda.empty_cache()
num_images_to_display = 5
count_displayed = 0
asteroid_masks = []
damage_masks = []
debris_masks = []
explosion_masks = []
satellite_masks = []
for i in range(len(test_dataset)):
img, _ = test_dataset[i]
with torch.no_grad():
prediction = model([img.to(device)])
pred = prediction[0]
if count_displayed < num_images_to_display and len(pred['labels']) > 0:
boxes_to_plot = pred['boxes'][pred['scores'] > 0.3]
labels_to_plot = [classes[i] for i in pred['labels'][pred['scores'] > 0.2].tolist()]
if len(labels_to_plot) != len(boxes_to_plot):
print(f"Number of boxes and labels mismatch for image {i}")
labels_to_plot = labels_to_plot[:len(boxes_to_plot)]
img_int = torch.tensor(img * 255, dtype=torch.uint8)
fig, ax = plt.subplots(1, 1, figsize=(10, 8))
ax.imshow(draw_bounding_boxes(img_int, boxes_to_plot, labels_to_plot, width=4).permute(1, 2, 0))
ax.set_title('Image with Bounding Boxes')
ax.axis('off')
plt.show()
# Then, in your loop where you create masks, add conditions for each new label
for bbox, label in zip(boxes_to_plot, labels_to_plot):
xmin, ymin, xmax, ymax = map(int, bbox.tolist())
mask = np.zeros((img_int.shape[1], img_int.shape[2]), dtype=np.uint8)
if label in classes: # Detect and create a mask for each class
mask[ymin:ymax, xmin:xmax] = 1
if label == 'Asteroid':
asteroid_masks.append(mask)
elif label == 'Damage':
damage_masks.append(mask)
elif label == 'Debris':
debris_masks.append(mask)
elif label == 'Explosion':
explosion_masks.append(mask)
elif label == 'Satellite':
satellite_masks.append(mask)
num_masks_to_display = len(asteroid_masks) + len(damage_masks) + len(debris_masks) + len(explosion_masks) + len(satellite_masks)
fig, axes = plt.subplots(1, num_masks_to_display + 1, figsize=(20, 8))
axes[0].imshow(img_int.permute(1, 2, 0))
axes[0].set_title('Original Image')
axes[0].axis('off')
for j, mask in enumerate(asteroid_masks + damage_masks + debris_masks + explosion_masks + satellite_masks):
axes[j + 1].imshow(mask, cmap='gray')
axes[j + 1].set_title(f'Mask {j+1}')
axes[j + 1].axis('off')
plt.tight_layout()
plt.show()
count_displayed += 1
if count_displayed >= num_images_to_display:
break
loading annotations into memory... Done (t=0.00s) creating index... index created!
In [25]:
import numpy as np
import matplotlib.pyplot as plt
import cv2
# Assuming you have variables 'img_int' (image tensor), 'masks' (list of masks),
# 'shadow_masks', and 'satellite_masks' from the previous code blocks
# Check if masks are available
if 'masks' not in locals():
masks = [] # Placeholder for masks
if 'shadow_masks' not in locals():
shadow_masks = [] # Placeholder for shadow masks
if 'satellite_masks' not in locals():
satellite_masks = [] # Placeholder for satellite masks
# Display the image with bounding boxes and masks
if masks: # Check if masks are defined
fig, axes = plt.subplots(1, len(masks) + 2, figsize=(20, 8))
# Display the original image with bounding boxes
axes[0].imshow(img_int.permute(1, 2, 0))
axes[0].set_title('Original Image with Bounding Boxes')
axes[0].axis('off')
# Display individual masks
for j, mask in enumerate(masks):
axes[j + 1].imshow(mask, cmap='gray')
axes[j + 1].set_title(f'Mask {j+1}')
axes[j + 1].axis('off')
# Display the satellite masks
for j, mask in enumerate(satellite_masks):
axes[len(masks) + j + 1].imshow(mask, cmap='gray')
axes[len(masks) + j + 1].set_title(f'Satellite Mask {j+1}')
axes[len(masks) + j + 1].axis('off')
# You can also combine and display all masks in one plot
combined_masks = np.zeros_like(masks[0]) if masks else np.zeros((img_int.shape[1], img_int.shape[2]), dtype=np.uint8)
for mask in masks:
combined_masks += mask # Add masks together to create a combined mask
axes[-1].imshow(combined_masks, cmap='gray') # Display the combined mask
axes[-1].set_title('Combined Masks')
axes[-1].axis('off')
plt.tight_layout()
plt.show()
else:
print("Masks are not available.")
# Resize the shadow masks to match image dimensions
resized_shadow_masks = [cv2.resize(mask, (img_int.shape[2], img_int.shape[1])) for mask in shadow_masks]
# Resize the satellite masks to match image dimensions
resized_satellite_masks = [cv2.resize(mask, (img_int.shape[2], img_int.shape[1])) for mask in satellite_masks]
# Combine shadow and satellite masks
combined_masks = []
for shadow_mask, satellite_mask in zip(resized_shadow_masks, resized_satellite_masks):
# Combine masks by logical OR operation
combined_mask = np.maximum(shadow_mask, satellite_mask)
combined_masks.append(combined_mask)
# Define colors for marking shadow and non-shadow areas
green = (0, 255, 0) # Green color for shadow areas
red = (255, 0, 0) # Red color for non-shadow areas
# Iterate over each combined mask
for combined_mask in combined_masks:
# Create an RGB image to visualize the marked regions
marked_image = cv2.cvtColor(combined_mask, cv2.COLOR_GRAY2RGB)
# Iterate over each pixel in the combined mask
for i in range(marked_image.shape[0]):
for j in range(marked_image.shape[1]):
# Check if the pixel corresponds to a shadow area
if combined_mask[i, j] == 255: # Assuming shadow areas are marked as 255 in the combined mask
marked_image[i, j] = green # Mark shadow areas as green
else:
marked_image[i, j] = red # Mark non-shadow areas as red
# Display the marked image
plt.imshow(marked_image)
plt.title('Marked Hazard Map')
plt.axis('off')
plt.show()
Masks are not available.
In [65]:
import imageio
from IPython.display import display, Image
# Read the input GIF
input_gif_path = 'KH95.gif'
input_gif = imageio.mimread(input_gif_path)
# Display the input GIF
display(Image(filename=input_gif_path))
# Define a function to apply a hazard map to each frame of the GIF
def apply_hazard_map(frame):
# Convert the frame to grayscale
gray_frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
# Apply a simple threshold to create a binary hazard map
_, binary_map = cv2.threshold(gray_frame, 127, 255, cv2.THRESH_BINARY)
# Create an RGB image to visualize the hazard map
hazard_map = cv2.cvtColor(binary_map, cv2.COLOR_GRAY2RGB)
# Mark hazard areas in red
hazard_map[binary_map == 255] = (255, 0, 0)
return hazard_map
# Process each frame of the input GIF and create a hazard-mapped GIF
output_frames = []
for frame in input_gif:
# Apply the hazard map to the current frame
hazard_mapped_frame = apply_hazard_map(frame)
# Append the hazard-mapped frame to the output frames list
output_frames.append(hazard_mapped_frame)
# Save the hazard-mapped frames as a new GIF
output_gif_path = 'output.gif'
imageio.mimsave(output_gif_path, output_frames)
# Display the hazard-mapped GIF
display(Image(filename=output_gif_path))
<IPython.core.display.Image object>
<IPython.core.display.Image object>
In [27]:
import numpy as np
import matplotlib.pyplot as plt
import cv2
# Assuming you have variables 'img_int' (image tensor), 'masks' (list of masks),
# 'shadow_masks', and 'satellite_masks' from the previous code blocks
# Check if masks are available
if 'masks' not in locals():
masks = [] # Placeholder for masks
if 'shadow_masks' not in locals():
shadow_masks = [] # Placeholder for shadow masks
if 'satellite_masks' not in locals():
satellite_masks = [] # Placeholder for satellite masks
# Display the image with bounding boxes and masks
if masks: # Check if masks are defined
fig, axes = plt.subplots(1, len(masks) + 2, figsize=(20, 8))
# Display the original image with bounding boxes
axes[0].imshow(img_int.permute(1, 2, 0))
axes[0].set_title('Original Image with Bounding Boxes')
axes[0].axis('off')
# Display individual masks
for j, mask in enumerate(masks):
axes[j + 1].imshow(mask, cmap='gray')
axes[j + 1].set_title(f'Mask {j+1}')
axes[j + 1].axis('off')
# Display the satellite masks
for j, mask in enumerate(satellite_masks):
axes[len(masks) + j + 1].imshow(mask, cmap='gray')
axes[len(masks) + j + 1].set_title(f'Satellite Mask {j+1}')
axes[len(masks) + j + 1].axis('off')
# You can also combine and display all masks in one plot
combined_masks = np.zeros_like(masks[0]) if masks else np.zeros((img_int.shape[1], img_int.shape[2]), dtype=np.uint8)
for mask in masks:
combined_masks += mask # Add masks together to create a combined mask
axes[-1].imshow(combined_masks, cmap='gray') # Display the combined mask
axes[-1].set_title('Combined Masks')
axes[-1].axis('off')
plt.tight_layout()
plt.show()
else:
print("Masks are not available.")
# Resize the shadow masks to match image dimensions
resized_shadow_masks = [cv2.resize(mask, (img_int.shape[2], img_int.shape[1])) for mask in shadow_masks]
# Resize the satellite masks to match image dimensions
resized_satellite_masks = [cv2.resize(mask, (img_int.shape[2], img_int.shape[1])) for mask in satellite_masks]
# Combine shadow and satellite masks
combined_masks = []
for shadow_mask, satellite_mask in zip(resized_shadow_masks, resized_satellite_masks):
# Combine masks by logical OR operation
combined_mask = np.maximum(shadow_mask, satellite_mask)
combined_masks.append(combined_mask)
# Define colors for marking shadow and non-shadow areas
green = (0, 255, 0) # Green color for shadow areas
red = (255, 0, 0) # Red color for non-shadow areas
# Iterate over each combined mask
for combined_mask in combined_masks:
# Create an RGB image to visualize the marked regions
marked_image = np.zeros((combined_mask.shape[0], combined_mask.shape[1], 3), dtype=np.uint8)
# Iterate over each pixel in the combined mask
for i in range(marked_image.shape[0]):
for j in range(marked_image.shape[1]):
# Check if the pixel corresponds to a shadow area
if combined_mask[i, j] == 255: # Assuming shadow areas are marked as 255 in the combined mask
marked_image[i, j] = red # Mark shadow areas as green
else:
marked_image[i, j] = green # Mark non-shadow areas as red
# Display the marked image
plt.imshow(marked_image)
plt.title('Marked Hazard Map')
plt.axis('off')
plt.show()
Masks are not available.
In [49]:
import requests
import matplotlib.pyplot as plt
import datetime
def fetch_cme_data(api_key, start_date, end_date):
url = f"https://api.nasa.gov/DONKI/CMEAnalysis?startDate={start_date}&endDate={end_date}&api_key={api_key}"
response = requests.get(url)
data = response.json()
return data
def fetch_geomagnetic_storm_data(api_key, start_date, end_date, location):
url = f"https://api.nasa.gov/DONKI/GST?startDate={start_date}&endDate={end_date}&location={location}&api_key={api_key}"
response = requests.get(url)
data = response.json()
return data
def plot_cme_and_storm_data(cme_data, storm_data):
cme_times = [datetime.datetime.strptime(entry['startTime'], "%Y-%m-%dT%H:%MZ") for entry in cme_data]
cme_intensity = [entry['cmeSpeed'] for entry in cme_data]
storm_times = [datetime.datetime.strptime(entry['time_tag'], "%Y-%m-%dT%H:%MZ") for entry in storm_data]
storm_intensity = [entry['gstID'] for entry in storm_data]
plt.figure(figsize=(10, 6))
plt.plot(cme_times, cme_intensity, label='CME Intensity', marker='o')
plt.plot(storm_times, storm_intensity, label='Geomagnetic Storm Intensity', marker='x')
plt.xlabel('Time')
plt.ylabel('Intensity')
plt.title('Coronal Mass Ejections vs Geomagnetic Storms')
plt.legend()
plt.xticks(rotation=45)
plt.grid(True)
plt.tight_layout()
plt.show()
if __name__ == "__main__":
api_key = "DrBNiuYkPdMFdariObeVU7Oo4cHhIHijdpolDbHY"
start_date = "2024-01-01"
end_date = "2024-04-01"
location = "earth"
# Sample CME data with start time
sample_cme_data = [
{"startTime": "2024-01-01T12:00Z", "cmeSpeed": 500},
{"startTime": "2024-02-01T15:30Z", "cmeSpeed": 600},
{"startTime": "2024-03-01T08:45Z", "cmeSpeed": 550}
]
# Sample geomagnetic storm data with time tag
sample_storm_data = [
{"time_tag": "2024-01-05T14:00Z", "gstID": 3},
{"time_tag": "2024-02-10T10:30Z", "gstID": 4},
{"time_tag": "2024-03-20T17:15Z", "gstID": 5}
]
plot_cme_and_storm_data(sample_cme_data, sample_storm_data)
In [54]:
import requests
import matplotlib.pyplot as plt
def fetch_solar_flare_data(api_key, start_date, end_date):
url = f"https://api.nasa.gov/DONKI/FLR?startDate={start_date}&endDate={end_date}&api_key={api_key}"
response = requests.get(url)
data = response.json()
return data
def fetch_geomagnetic_storm_data(api_key, start_date, end_date, location):
url = f"https://api.nasa.gov/DONKI/GST?startDate={start_date}&endDate={end_date}&location={location}&api_key={api_key}"
response = requests.get(url)
data = response.json()
return data
def plot_intensity_correlation(solar_flare_data, geomagnetic_storm_data):
flare_count = len(solar_flare_data)
storm_count = len(geomagnetic_storm_data)
plt.figure(figsize=(8, 6))
plt.plot([flare_count], [storm_count], 'bo')
plt.xlabel('Number of Solar Flares')
plt.ylabel('Number of Geomagnetic Storms')
plt.title('Correlation between Number of Solar Flares and Geomagnetic Storms')
plt.grid(True)
plt.tight_layout()
plt.show()
if __name__ == "__main__":
api_key = "DrBNiuYkPdMFdariObeVU7Oo4cHhIHijdpolDbHY"
start_date = "2024-01-01"
end_date = "2024-04-01"
location = "earth"
solar_flare_data = fetch_solar_flare_data(api_key, start_date, end_date)
geomagnetic_storm_data = fetch_geomagnetic_storm_data(api_key, start_date, end_date, location)
plot_intensity_correlation(solar_flare_data, geomagnetic_storm_data)
In [57]:
import requests
import matplotlib.pyplot as plt
def fetch_solar_flare_data(api_key, start_date, end_date):
url = f"https://api.nasa.gov/DONKI/FLR?startDate={start_date}&endDate={end_date}&api_key={api_key}"
response = requests.get(url)
data = response.json()
return data
def fetch_geomagnetic_storm_data(api_key, start_date, end_date, location):
url = f"https://api.nasa.gov/DONKI/GST?startDate={start_date}&endDate={end_date}&location={location}&api_key={api_key}"
response = requests.get(url)
data = response.json()
return data
def plot_solar_flare_intensity_over_time(solar_flare_data):
times = [entry['beginTime'] for entry in solar_flare_data]
intensities = [entry['classType'] for entry in solar_flare_data]
plt.subplot(3, 1, 1)
plt.plot(times, intensities, marker='o')
plt.xlabel('Time')
plt.ylabel('Solar Flare Intensity')
plt.title('Solar Flare Intensity over Time')
def plot_geomagnetic_storm_intensity_over_time(geomagnetic_storm_data):
times = [entry['startTime'] for entry in geomagnetic_storm_data]
intensities = [entry['gstID'] for entry in geomagnetic_storm_data]
plt.subplot(3, 1, 2)
plt.plot(times, intensities, marker='x')
plt.xlabel('Time')
plt.ylabel('Geomagnetic Storm Intensity')
plt.title('Geomagnetic Storm Intensity over Time')
def plot_intensity_correlation(solar_flare_data, geomagnetic_storm_data):
flare_count = len(solar_flare_data)
storm_count = len(geomagnetic_storm_data)
plt.subplot(3, 1, 3)
plt.plot([flare_count], [storm_count], 'bo')
plt.xlabel('Number of Solar Flares')
plt.ylabel('Number of Geomagnetic Storms')
plt.title('Correlation between Number of Solar Flares and Geomagnetic Storms')
plt.tight_layout()
if __name__ == "__main__":
api_key = "DrBNiuYkPdMFdariObeVU7Oo4cHhIHijdpolDbHY"
start_date = "2024-01-01"
end_date = "2024-04-01"
location = "earth"
solar_flare_data = fetch_solar_flare_data(api_key, start_date, end_date)
geomagnetic_storm_data = fetch_geomagnetic_storm_data(api_key, start_date, end_date, location)
plt.figure(figsize=(10, 12))
plot_solar_flare_intensity_over_time(solar_flare_data)
plot_geomagnetic_storm_intensity_over_time(geomagnetic_storm_data)
plot_intensity_correlation(solar_flare_data, geomagnetic_storm_data)
plt.show()
In [63]:
import pandas as pd
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_absolute_error
from sklearn.impute import SimpleImputer
# Load data from CSV file
data = pd.read_csv('Hackathon dataset/hardware_degradation_data.csv')
# Data Preprocessing
data['timestamp'] = pd.to_datetime(data['timestamp'])
data['time_between_failures'] = data['timestamp'].diff().dt.total_seconds()
data['repair_time'] = data['timestamp'].diff().dt.total_seconds()
# Impute missing values in 'time_between_failures' and 'repair_time' using mean
imputer = SimpleImputer(strategy='mean')
data['time_between_failures'] = imputer.fit_transform(data[['time_between_failures']])
data['repair_time'] = imputer.fit_transform(data[['repair_time']])
# Calculate MTBF, MTTR, OEE after handling missing values
mtbf = data['time_between_failures'].mean()
mttr = data['repair_time'].mean()
oee = (mtbf / (mtbf + mttr)) * 100 # Calculate Overall Equipment Effectiveness as a percentage
# Model training (example using linear regression)
X = data[['time_between_failures']]
y = data['repair_time']
model = LinearRegression()
model.fit(X, y)
# Predict repair time for new data
new_data = pd.DataFrame({'time_between_failures': [100, 200]})
predicted_repair_time = model.predict(new_data)
# Evaluate model performance
mae = mean_absolute_error(data['repair_time'], model.predict(X))
print(f'Mean Absolute Error: {mae}')
# Print performance metrics
print(f'MTBF: {mtbf} seconds')
print(f'MTTR: {mttr} seconds')
print(f'OEE: {oee}%')
Mean Absolute Error: 0.0 MTBF: 3600.0 seconds MTTR: 3600.0 seconds OEE: 50.0%
In [ ]:
In [ ]: